package com.galeno.练习

import ch.hsr.geohash.GeoHash
import com.alibaba.fastjson.{JSON, JSONObject}
import com.galeno.utils.SparkUtil
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD

/**
 * @Title: ${file_name}
 * @Description: ${todo}
 * @author galeno
 * @date 2021/9/222:26
 */
object GeoHashDemo5 {
  def main(args: Array[String]): Unit = {
    val sc = SparkUtil.getSc
    //加载geoHash参考
    val geoRdd = sc.textFile("data/ref_geohash") map (s => {
      val arr = s.split(",")
      (arr(0), (arr(1), arr(2), arr(3)))
    })
    val geoRDD: RDD[(String, (String, String, String))] = geoRdd.groupByKey().mapValues(_.head)

    val geordd = geoRDD.collect().toMap
    //是一个阻塞的方法
    val geohashBro: Broadcast[Map[String, (String, String, String)]] = sc.broadcast(geordd)
    //加载日志文件
    val logRdd: RDD[String] = sc.textFile("data/app_log_2021-06-05.log")
    //都变为kv
    val logRDD: RDD[(String, JSONObject)] = logRdd.map(s => {
      val jSONObject = JSON.parseObject(s)
      val lat = jSONObject.getDouble("latitude")
      val lng = jSONObject.getDouble("longitude")
      val geohash = GeoHash.geoHashStringWithCharacterPrecision(lat, lng, 6)
      (geohash, jSONObject)
    })

    logRDD.map(tp=>{
      val geoMap: Map[String, (String, String, String)] = geohashBro.value

      val JSONObj: JSONObject = tp._2
       if(tp._1.contains(geoMap.keys)){


       }
    })





















  }
}
